vendredi 24 juin 2016

Spring integration: Why JsonParseException occur when you send a message to a specific ip?

The message came as a queueChannel using TcpNioClientConnectionFactory attempts to send to a specific IP.

Adapter is ...

@Bean
public AbstractConnectionFactory plainClientFactory() {
    int port = Integer.parseInt(outboundPort);
    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(outboundHost, port);
    factory.setTaskExecutor(taskSchedulerWithPlain());
    factory.setLookupHost(false);

    factory.setSerializer(echoSerializer);
    factory.setDeserializer(echoSerializer);

    // Nagle's algorithm disabled
    factory.setSoTcpNoDelay(true);

    return factory;
}

@Bean
public TcpSendingMessageHandler plainClientHandler() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(plainClientFactory());

    return handler;
}

@Bean
public Executor taskSchedulerWithPlain() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setPoolSize(PROCESS_SIZE);
    scheduler.setThreadFactory(plainAffinityThreadFactory());
    scheduler.setWaitForTasksToCompleteOnShutdown(true);

    return scheduler;
}

Config is ...

@Resource(name = "sslClientHandler")
TcpSendingMessageHandler sslClientHandler;

@Bean
public IntegrationFlow flowForConvertingSslJsonToBytesAndSendClient() {
    return IntegrationFlows.from(outputWithSslJsonBytesToClient())
            .transform(new ObjectToJsonTransformer())
            .transform(new PayloadSerializingTransformer())
            .handle(INBOUND_SERVICE, ATTACH_HEADER).handle(sslClientHandler).get();
}

@Bean
public MessageChannel outputWithSslJsonBytesToClient() {
    return MessageChannels.queue(POOL_SIZE).get();
}

InboundServiceImpl is ...

@Override
public Object extractPayloadAsJson(byte[] message) throws Exception {
    log.debug("receive bytes... {} bytes", message.length);

    StringBuffer sb = new StringBuffer();
    for (byte b : message) {
        sb.append(b);
    }

    log.debug("extractPayloadAsJson message to string : {}", sb.toString());

    int payloadSize = message.length - EXPECTED_HEADER_SIZE;

    byte[] payload = new byte[payloadSize];
    byte[] header = new byte[EXPECTED_HEADER_SIZE];

    System.arraycopy(message, EXPECTED_HEADER_SIZE, payload, 0, payloadSize);
    log.debug("extract json... {} bytes", payload.length);

    System.arraycopy(message, 0, header, 0, EXPECTED_HEADER_SIZE);

    ObjectMapper mapper = new ObjectMapper();

    HashMap<String, Object> payloadAsMap = mapper.readValue(payload, HashMap.class);
    log.debug("convert map... {}", payloadAsMap.entrySet());

    return payloadAsMap;
}


@Override
public byte[] attachHeader(byte[] payload) throws Exception {
    byte[] jsonFlag = ByteBuffer.allocate(HEADER_SIZE_JSON).putShort((short) 0).array();
    byte[] crcWithoutJsonFlag = new byte[HEADER_SIZE_CRC_WITHOUT_JSON_FLAG];
    byte[] rcmd = new byte[HEADER_SIZE_RCMD];
    byte[] packetSize = ByteBuffer.allocate(HEADER_PACKET_SIZE).putInt(payload.length).array();

    byte[] concat = Bytes.concat(jsonFlag, crcWithoutJsonFlag, rcmd, packetSize, payload);

    log.debug("concat {} bytes", concat.length);
    log.debug("concat to string : {}", sb.toString());

    return concat;
}

However, an error occurs when you try to transfer data ...
Why does this error to occur?

ERROR 5432 --- [task-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2]

DEBUG 5432 --- [task-scheduler-5] o.s.integration.channel.QueueChannel     : postReceive on channel 'inputWithPlainJson', message: GenericMessage [payload=byte[201], headers={ip_address=192.168.5.97, id=eb9a314a-b767-6ad6-137f-3c4a2bb90f15, ip_hostname=192.168.5.97, ip_tcp_remotePort=49811, ip_connectionId=192.168.5.97:49811:5001:676813de-72f7-46ea-92fc-a902dc7bcb39, timestamp=1466471813736}]
DEBUG 5432 --- [task-scheduler-5] o.s.i.endpoint.PollingConsumer           : Poll resulted in Message: GenericMessage [payload=byte[201], headers={ip_address=192.168.5.97, id=eb9a314a-b767-6ad6-137f-3c4a2bb90f15, ip_hostname=192.168.5.97, ip_tcp_remotePort=49811, ip_connectionId=192.168.5.97:49811:5001:676813de-72f7-46ea-92fc-a902dc7bcb39, timestamp=1466471813736}]
DEBUG 5432 --- [task-scheduler-5] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@1da1de6] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: GenericMessage [payload=byte[201], headers={ip_address=192.168.5.97, id=eb9a314a-b767-6ad6-137f-3c4a2bb90f15, ip_hostname=192.168.5.97, ip_tcp_remotePort=49811, ip_connectionId=192.168.5.97:49811:5001:676813de-72f7-46ea-92fc-a902dc7bcb39, timestamp=1466471813736}]
DEBUG 5432 --- [task-scheduler-5] c.m.j.e.s.service.InboundServiceImpl     : receive bytes... 201 bytes
DEBUG 5432 --- [task-scheduler-5] c.m.j.e.s.service.InboundServiceImpl     : extractPayloadAsJson message to string : SUCCESS
DEBUG 5432 --- [task-scheduler-5] c.m.j.e.s.service.InboundServiceImpl     : extract json... 189 bytes
DEBUG 5432 --- [task-scheduler-5] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2], headers={id=872c4b3f-f2b4-3213-41c8-71e65f983390, timestamp=1466471815642}]
DEBUG 5432 --- [task-scheduler-5] o.s.integration.handler.LoggingHandler   : (inner bean)#41bec956 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2], headers={id=872c4b3f-f2b4-3213-41c8-71e65f983390, timestamp=1466471815642}]
ERROR 5432 --- [task-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('?' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: [B@5dc435b4; line: 1, column: 2]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
at org.springframework.integration.dsl.support.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:57)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

payload is ...

[payload={"result":"success","reason":0,"pushTargetList":["dhjung","hiryu","hjoh","hslee","hslee1","jhbae","jslee1","khhwang","permedia","test_uid","wjchoi1","ysahn"],"response":"pushTarget"}]

header is ...

[header={json__ContentTypeId__=class java.lang.String, json__TypeId__=class java.util.HashMap, ip_tcp_remotePort=61036, ip_connectionId=192.168.3.96:61036:5001:e43b15c1-14b8-4694-b7ff-3bd8d5bb9379, ip_address=192.168.3.96, id=ad405ee4-f858-d7f0-ba7d-8a7ae25cc8a5, json__KeyTypeId__=class java.lang.String, contentType=application/json, ip_hostname=192.168.3.96, timestamp=1466476432549}]

Aucun commentaire:

Enregistrer un commentaire